mysql 1亿条数据带条件查询 数据库一亿条数据

您所在的位置:网站首页 mysql 1亿条数据 mysql 1亿条数据带条件查询 数据库一亿条数据

mysql 1亿条数据带条件查询 数据库一亿条数据

2024-05-28 05:44| 来源: 网络整理| 查看: 265

目标

最近公司要求做一个web版ETL工具,需要一次性查询出来任意一张表的所有的数据,有一张表到达了1亿两千万,所有经过探索采用了以下的方案

先说一下内存溢出的原因jdbc查询mysql时,默认会一次性将sql查询的数据全部从服务器加载到内存中,当数据过多时,导致内存溢出解决方式

1.设置每次从服务器加载到内存的条数 statement.setFetchSize(batchSize); 参考资料 jdbc官网:https://docs.oracle.com/database/121/JJDBC/resltset.htm#JJDBC28620 2.设置ResultSet对象的大小的限制 statement.setMaxRows(int i)

参考solr

参考solr源码时,发现solr为什么查询一亿多数据的时候,不会oom,平常8g内存 1千万数据就会oom了, 原来 solr 重写了一个Iterator 并且把jdbc 设置了每次读取数据库并不读取全部,而且读取一部分流

jdbc 的设置

Connection创建 Statement时,更改默认的配置 原先默认配置 的方法都能够实现任意的前后滚动,使用各种移动的ResultSet指针的方法,不能够释放内存,导致查询数据过多时,oom,所以修改为 CONCUR_READ_ONLY,参数只允许结果集的游标向下移动,移动的同时也就释放掉了内存

Statement statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); statement.setFetchSize(batchSize); // 设置从数据库取得多行的行数大小 statement.setMaxRows(maxRows); // 将此 Statement 对象生成的所有 ResultSet 对象可以包含的最大行数限制设置为给定数解决方案(模仿solr 编写查询迭代器)

完整代码git版 https://gitee.com/hugo110/common-utils/blob/master/src/main/java/com/fanfan/demo/solrjdbc/JdbcDataSource.java JdbcDataSource和ResultSetIterator

package com.fanfan.demo.solrjdbc; import java.math.BigDecimal; import java.math.BigInteger; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import com.fanfan.demo.solrjdbc.model.DataBaseModel; import com.fanfan.utils.database.ConnectionUtil; import com.fanfan.utils.database.JdbcUtil; import lombok.Data; /** * 封装了 根据sql 封装的链接池 * * @ClassName JdbcDataSource * @author 于国帅 * @date 2019年2月21日 下午3:01:36 * */ //@Slf4j public class JdbcDataSource { private DataBaseModel databaseModel; // 链接数据的信息 public JdbcDataSource(DataBaseModel databaseModel) { this.databaseModel = databaseModel; } public static void main(String[] args) { DataBaseModel jdbcModel = new DataBaseModel(); jdbcModel.setDbDriver("com.mysql.jdbc.Driver"); jdbcModel.setDbUrl( "jdbc:mysql://127.0.0.1:3306/springboot-boot?useUnicode=true&characterEncoding=UTF-8&useSSL=false&autoReconnect=true&failOverReadOnly=false"); jdbcModel.setDbUserName("root"); jdbcModel.setDbPassword("root"); JdbcDataSource jdbcDataSource = new JdbcDataSource(jdbcModel); String sql = "select * from t_system_menu"; Iterator iterator = jdbcDataSource.getResultSetIterator(sql); int i = 0; while (iterator.hasNext()) { System.err.println(iterator.next()); i++; } System.err.println("总共+" + i); } // 支持查询 public Iterator getResultSetIterator(String querySql) { try { // 返回迭代器 // 初始化 Statement // 执行查询 return new ResultSetIterator(getConnection(), querySql); } catch (SQLException e) { return null; } } protected Connection getConnection() { // 初始化数据源 ConnectionUtil.init(databaseModel.getDbDriver(), databaseModel.getDbUrl(), databaseModel.getDbUserName(), databaseModel.getDbPassword()); // 初始化连接 Connection Connection connection = ConnectionUtil.createConnection(); // 返回对应的链接 // 如果使用连接池 ,那么把链接放到连接池里面等待使用 选择关闭就可以 return connection; } } /** * 封装大数据 Iterator 能够实现对应的数据源获取 * * @获取的model Map key 列名称,value 列数据 * @ClassName ResultSetIterator * @author 于国帅 * @date 2019年2月21日 下午3:02:48 * */ @Data class ResultSetIterator implements Iterator { private ResultSet resultSet; // 查询出来的结果集 private List fieldNameList; // 需要查询的字段的集合 public ResultSetIterator(Connection connection, String querySql) throws SQLException { // 初始化 Statement 防止又一次查询把之前的结果集关闭 Statement statement = JdbcUtil.createStatement(connection, 500, 0); // 执行查询 this.resultSet = JdbcUtil.executeStatement(statement, querySql); // 设置字段 this.fieldNameList = JdbcUtil.getFieldNameList(this.resultSet.getMetaData()); } @Override public boolean hasNext() { try { return resultSet.next(); } catch (SQLException e) { // 关闭链接 return false; } } /** * 返回当前结果集的一条 * * @Title next * @author 于国帅 * @date 2019年2月21日 下午4:09:05 * @return * @see java.util.Iterator#next() */ @Override public Map next() { Map rowMap = new HashMap(); try { for (String fieldName : this.fieldNameList) { // Use underlying database's type information except for BigDecimal and BigInteger // which cannot be serialized by JavaBin/XML. See SOLR-6165 Object value; value = this.resultSet.getObject(fieldName); if (value instanceof BigDecimal || value instanceof BigInteger) { rowMap.put(fieldName, value.toString()); } else { rowMap.put(fieldName, value); } } } catch (SQLException e) { return rowMap; } return rowMap; } }

DataBaseModel

package com.fanfan.demo.solrjdbc.model; import lombok.Data; /** * 链接数据库的model 属性 * * @ClassName DataBaseModel * @author 于国帅 * @date 2019年2月21日 下午2:56:37 * */ @Data public class DataBaseModel { // id private Long id; // 数据库的名称 private String dbName; // 数据库的类型 ,例如是mysql 还是oracle private String type; // 链接数据库的URL private String dbUrl; // 使用的驱动名称 private String dbDriver; // 用户名称 private String dbUserName; // 密码 private String dbPassword; }

工具类

package com.fanfan.utils.database; import java.sql.Connection; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.List; /** * 操作大数据 jdbc 工具类 * * @ClassName JdbcUtil * @author 于国帅 * @date 2019年2月21日 下午4:37:47 * */ public class JdbcUtil { /** * 获取当前结果集的所有字段名称 * * @Title getFieldNameList * @author 于国帅 * @date 2019年2月21日 下午4:14:18 * @param metaData * @return * @throws SQLException * List */ public static List getFieldNameList(ResultSetMetaData metaData) { List colNameList = new ArrayList(); try { int count = metaData.getColumnCount() + 1; for (int i = 1; i < count; i++) { colNameList.add(metaData.getColumnLabel(i)); } } catch (SQLException e) { return colNameList; } return colNameList; } /** * 获取 只能够向下移动的 Statement * * @Title createStatement * @author 于国帅 * @date 2019年2月21日 下午4:38:07 * @param connection * @param batchSize * @param maxRows * @return * @throws SQLException * Statement */ public static Statement createStatement(final Connection connection, final int batchSize, final int maxRows) throws SQLException { // TYPE_FORWARD_ONLY 而默认的TYPE_FORWARD_ONLY参数只允许结果集的游标向下移动。 在从ResultSet(结果集)中读取记录的时,对于访问过的记录就自动释放了内存。 // ResultSet.CONCUR_READ_ONLY 就是类似只读 属性,不可仪更改的啊!不能用结果集更新数据。 Statement statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); statement.setFetchSize(batchSize); // 设置从数据库取得多行的行数大小 statement.setMaxRows(maxRows); // 将此 Statement 对象生成的所有 ResultSet 对象可以包含的最大行数限制设置为给定数 return statement; } public static ResultSet executeStatement(Statement statement, String query) throws SQLException { boolean resultSetAvailable = statement.execute(query); // 如果是查询的话返回true,如果是更新或插入的话就返回false了; while (!resultSetAvailable && statement.getUpdateCount() != -1) { resultSetAvailable = statement.getMoreResults(); // Statement提供了一个getMoreResults()的方法,该方法能将当前Statement "指针" 移动到下一个结果集. } if (resultSetAvailable) { return statement.getResultSet(); } return null; } }

ConnectionUtil

package com.fanfan.utils.database; import java.sql.Connection; import java.sql.Driver; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import com.mysql.jdbc.exceptions.jdbc4.CommunicationsException; /** * 数据库连接工具类 主要是创建连接 * * @author JueYue * @date 2014年12月21日 */ public class ConnectionUtil { private static Connection connection; private static String DB_DRIVER; private static String DB_URL; private static String DB_USER_NAME; private static String DB_PASSWORD; private static ThreadLocal connectionTl = new ThreadLocal(); private ConnectionUtil() { } private static ConnectionUtil instance; public static Statement createStatement() { try { /* DatabaseMetaData dbMeta = connectionTl.get().getMetaData(); //System.out.println("1111111111===="+dbMeta.supportsColumnAliasing()); */ return connectionTl.get().createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_READ_ONLY); } catch (Exception e) { throw new RuntimeException("创建 Statement 发生异常", e); } } public static Connection createConnection() { try { return connectionTl.get(); } catch (Exception e) { throw new RuntimeException("创建 Statement 发生异常", e); } } public static Statement createThisStatement() { if (instance == null) { initThis(); } try { return connection.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_READ_ONLY); } catch (Exception e) { throw new RuntimeException("创建 Statement 发生异常", e); } } private static void initThis() { try { Class.forName(DB_DRIVER); connection = DriverManager.getConnection(DB_URL, DB_USER_NAME, DB_PASSWORD); } catch (Exception e) { throw new RuntimeException("创建 Connection 发生异常", e); } } public static void init(String deiver, String url, String username, String passwd) { try { // System.out.println(System.currentTimeMillis()); Class.forName(deiver); // System.out.println(System.currentTimeMillis()); connectionTl.set(DriverManager.getConnection(url, username, passwd)); // System.out.println(System.currentTimeMillis()); } catch (Exception e) { throw new RuntimeException("创建 Connection 发生异常", e); } } public static void checkConnection(String deiver, String url, String username, String passwd) { try { Driver driver = (Driver) Class.forName(deiver).newInstance(); boolean urlflag = driver.acceptsURL(url); if (!urlflag) { throw new RuntimeException("请检查数据库地址!"); } DriverManager.getConnection(url, username, passwd); } catch (ClassNotFoundException e) { throw new RuntimeException("请检查数据库类型!", e); } catch (CommunicationsException e) { throw new RuntimeException("请检查数据库地址!", e); } catch (SQLException e) { throw new RuntimeException("请检查用户名或密码,是否正确!", e); } catch (Exception e) { throw new RuntimeException("请检查驱动类型与数据库地址是否对应!", e); } } public static void close() { try { if (!connectionTl.get().isClosed()) connectionTl.get().close(); } catch (Exception e) { throw new RuntimeException("关闭 Connection 发生异常", e); } } public static void closeThis() { try { if (!connection.isClosed()) connection.close(); } catch (Exception e) { throw new RuntimeException("关闭 Connection 发生异常", e); } } public static void setDB_DRIVER(String dB_DRIVER) { DB_DRIVER = dB_DRIVER; } public static void setDB_URL(String dB_URL) { DB_URL = dB_URL; } public static void setDB_USER_NAME(String dB_USER_NAME) { DB_USER_NAME = dB_USER_NAME; } public static void setDB_PASSWORD(String dB_PASSWORD) { DB_PASSWORD = dB_PASSWORD; } }知识参考1.ConnectioncreateStatement(int resultSetType, int resultSetConcurrency)

默认这两个参数的共同特点是允许结果集(ResultSet)的游标可以上下移动 加载大量数据后,访问过内存不会释放,过多导致OOM

2.StatementsetFetchSize

设置从数据库取得多行的行数大小

setMaxRows

将此 Statement 对象生成的所有 ResultSet 对象可以包含的最大行数限制设置为给定数

setFetchDirection

向驱动程序提供关于方向的提示,在使用此 Statement 对象创建的 ResultSet 对象中将按该方向处理行。可以设置的参数是ResultSet.FETCH_FORWARD 、ResultSet.FETCH_REVERSE 和 ResultSet.FETCH_UNKNOWN。

getUpdateCount

可以知道成功更新了多少记录,如果返回值为0,就说明没有1条记录被更新

getMoreResults

Statement提供了一个getMoreResults()的方法,该方法能将当前Statement “指针” 移动到下一个结果集.

execute

execute 方法返回一个 boolean 值,以指示第一个结果的形式。必须调用 getResultSet 或 getUpdateCount 方法来检索结果,并且必须调用 getMoreResults 移动到任何后面的结果。 如果第一个结果是 ResultSet 对象,则返回 true;如果第一个结果是更新计数或者没有结果,则返回 false 意思就是如果是查询的话返回true,如果是更新或插入的话就返回false了;

3.ResultSet3.1 变量 int FETCH_FORWARD = 1000; int FETCH_REVERSE = 1001; int FETCH_UNKNOWN = 1002; int TYPE_FORWARD_ONLY = 1003;(在从ResultSet(结果集)中读取记录的时,对于访问过的记录就自动释放了内存。 只能向前滚动(这是默认值)) int TYPE_SCROLL_INSENSITIVE = 1004;(这两个方法都能够实现任意的前后滚动,使用各种移动的ResultSet指针的方法。二者的区别在于前者对于修改不敏感,而后者对于修改敏感。) int TYPE_SCROLL_SENSITIVE = 1005;(这两个方法都能够实现任意的前后滚动,使用各种移动的ResultSet指针的方法。二者的区别在于前者对于修改不敏感,而后者对于修改敏感。) int CONCUR_READ_ONLY = 1007;(就是类似只读 属性,不可仪更改的啊!不能用结果集更新数据。只允许结果集的游标向下移动。) int CONCUR_UPDATABLE = 1008;(ResultSet对象可以执行数据库的新增、修改、和移除。) int HOLD_CURSORS_OVER_COMMIT = 1; int CLOSE_CURSORS_AT_COMMIT = 2; 方法

getMetaData

结果集对象ResultSet中提供的一些信息



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3